ZooKeeper(五)技术内幕-系统模型和序列化协议

ZooKeeper(五)技术内幕

一. 系统模型

1.1 数据模型

ZooKeeper的数据模型为 ZNode,即数据节点,可以保存数据,同事可以挂载子节点,因此构成了一个层次化的命名空间,一个树结构。

ZooKeeper中的事务指能够改变ZK服务器状态的操作,一般包括数据节点的创建和删除、数据节点内容更新和客户端会话创建与失效等操作。每个事务会被分配一个全局唯一的事务ID,即ZXID,通常是一个64位的数字,每个ZXID对应一次更新操作,而且可以间接看出ZK处理这些请求的全局顺序。

1.2 节点特性

(1)节点类型

  • 持久节点(PERSISTENT):数据节点创建后就会一直存在于ZK服务器上,直到由删除操作主动清除此节点。
  • 持久顺序节点(PERSISTENT_SEQUENTIAL):有额外的顺序性特点,每个父节点都会为它的一级子节点维护一份顺序,创建子节点时可以设置这个标记,ZK会自动给节点名后加一个数字后缀。
  • 临时节点(EPHEMERAL):此节点的生命周期与客户端会话绑定,当会话失效,节点就会被自动清理掉(非TCP连接断开),临时节点只能作为叶子节点,不能创建子节点。
  • 临时顺序节点(EPHEMERAL_SEQUENTIAL):额外有了顺序性。

(2)状态信息

通过get命令获取一个数据节点的内容:

第一行是数据内容,后面就是节点的状态信息,其实就是数据节点的Stat对象的格式化输出:

Stat类包含上一个数据节点的所有状态信息(事务ID、版本信息和子节点个数等):

1.3 版本—乐观锁保证分布式数据原子性操作

每个数据节点都有三种类似新的版本信息:

当节点创建后,version值为0,当对此节点进行更新后,更新为1,代表变更次数。通过版本号实现的乐观锁来实现在数据并发竞争不大、事务冲突较少的应用场景。

ZK的PrepRequestProcessor处理器类中,处理每个数据更新请求(setDataRequest)时,会进行如下版本检查:

1
2
3
4
5
6
7
8
version = setDataRequest.getVersion();
// 获取当前服务器上该数据的最新版本
int currentVersion = nodeRecord.stat.getVersion();
// -1表示客户端不要求使用乐观锁
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException(path);
}
version = currentVersion + 1;

1.4 Watcher

ZK通过Watcher机制来实现发布/订阅的分布式通知功能,当服务端指定事件触发了Watcher,就会向指定客户端发送一个事件通知:

1.4.1 结构

Watcher对象存储在客户端的WatchManager中,收到通知后从中取出对应的Watcher对应来执行回调逻辑。

接口类Watcher表示一个标准的事件处理器,包含KeeperState和EventType两个枚举类,分别表示通知状态和事件类型,同时定义了事件的回调方法:process(WatchedEvent event)

Watcher事件常见的通知状态和事件类型:

内容变更包括节点的数据内容和数据的版本号dataVersion,即使相同数据内容,一旦有客户端调用数据更新的接口,触发的更新一样会发送相应通知。

AuthFailed是授权失败触发,使用错误的Auth只会抛出NoAuthException异常,而使用错误的Scheme则会抛出AuthFailedException,并且收到事件通知(AuthFailed, None)。

回调方法 process()

1
2
// WatchedEvent对象封装服务端事件并传递给Watcher
abstract public void process(WatchedEvent event);

WatchedEvent包含每个事件的三个基本属性:通知状态(KeeperState)、事件类型(eventType)和节点路径(path):

相对的WatcherEvent实现了序列号接口,可以用于网络传输:

服务端生成WatchedEvent事件后,会调用getWrapper方法将自己包装成一个可序列化的WatcherEvent事件,通过网络传输给客户端。客户端收到事件对象后会先将WatcherEvent还原为WatchedEvent,并传递给process方法处理。二者对于事件的封装都很简单,比如节点数据变更,客户端无法直接获取到原始数据和变更后数据:

1
2
3
KeeperState: SyncConnected
EventType: NodeDataChanged
Path: /zk-book

Watcher内部各组件间关系:

1.4.2 客户端注册Watcher

注册Watcher:

  • 创建ZK客户端实例时,可以给定一个默认的Watcher,它会一直保存在客户端ZKWatchManager的defaultWatcher中。

    1
    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);
  • 客户端也可以通过 getData()getChildren()exist() 三个接口来注册Watcher,例如getData:

    1
    2
    3
    4
    //  通过布尔参数标识是否使用前面的默认Watcher来进行注册
    public byte[] getData(String path, boolean watch, Stat stat);
    // 直接提供Watcher
    public byte[] getData(final String path, Watcher watcher, Stat stat);

客户端首先会对当前客户端请求request进行标记,设置为“使用Watcher监听”,同时封装一个Watcher注册信息WatchRegistration对象,临时保存数据节点的路径和Watcher的关系:

1
2
3
4
5
6
7
8
9
10
public Stat getData(final String path, Watcher watcher, Stat stat) {
...
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
...
request.setWatch(watcher != null);
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}

Packet可以看作是ZK中最小的通信协议单元,用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象。ClientCnxn的WatchRegistration也会封装到Packet中,再放入发送队列中等待客户端发送。

1
2
3
4
5
6
7
8
9
Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) {
Packet packet = null;
...
synchronized(outgoingQueue) {
packet = new Packet(h, r, request, response, watchRegistration);
...
outgoingQueue.add(packet);
}
}

随后,客户端发送请求,同时等待请求返回。完成发送后,由客户端SendThread线程的readResponse方法负责接收来自服务端的响应,finishPacket方法从Packet中取出对应的Watcher并注册到ZKWatchManager中去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void finishPacket(Packet p) {
if (p.watchRegistration != null) {
p.watchRegistration.register(p.replyHeader.getErr());
}
}

protected Map<String, Set<Watcher>> getWatches(int rc) {
return watchManager.dataWatches;
}

// 客户端将暂时保存的Watcher转交给ZKWatchManager,最终保存到dataWatches
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc);
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watchers.put(clientPath, watchers);
}
watchers.add(watcher);
}
}
}

流程图:

极端情况下,客户端每调用一次 getData() 接口,就会注册一个Watcher,但这些Watcher并不会都随着客户端请求发送到服务端造成内存紧张等性能问题。ZK底层实际的网络传输序列化过程中并没有将WatchRegistration对象完全的序列化到底层字节数组中,Packet内部序列化过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public void createBB() {
try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
// We'll fill this in later
boa.writeInt(-1, "len");
if (requestHeader != null) {
requestHeader.serialize(boa, "header");
}
if (request instanceof ConnectRequest) {
request.serialize(boa, "connect");
// append "am-I-allowed-to-be-readonly" flag
boa.wirteBool(readOnly, "readOnly");
} else if (request != null) {
request.serialize(boa, "request");
}
}
...
}

只有requestHeader和request进行了序列化,所以尽管WatchRegistration被封装到了Packet中,但并没有序列化到底层字节数组,因此不会进行网络传输。

1.4.3 服务端处理Watcher

客户端并不会将Watcher对象真正的传递到服务端,服务端如何完成客户端的Watcher注册?

(1)ServerCnxn存储

服务端收到请求后,在 FinalRequestProcessor.processRequest() 中判断当前请求是否需要注册Watcher:

1
2
3
4
5
6
7
case OpCode.getData: {
...
// 当getDtatRequest.getWatch()为True时,将当前ServerCnxn对象和数据节点路径传入getData方法,最终存储在WatchManager的watchTable和watch2Paths中
byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat, getDtatRequest.getWatch() ? cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
}

ServerCnxn是一个ZooKeeper客户端和服务器之间的连接接口,代表了一个二者间的连接。默认实现是NIOServerCnxn,3.4.0版本开始引入了基于Netty的实现NettyServerCnxn。都实现了Watcher的process接口,所以ServerCnxn也可以看作是一个Watcher。

WatchManager是ZK服务端Watcher的管理者,内部两种存储结构对应不同维度:

  • watchTable:从数据节点路径的粒度来托管Watcher。
  • watch2Paths:从Watcher的粒度控制事件触发需要触发的数据节点。

WatchManager还负责Watcher事件的触发,移除掉已被触发的Watcher。服务端DataTree会托管两个WatchManager(dataWatches和childWatches,对应数据变更和子节点变更)。

(2)Watcher触发

服务端如何触发Watcher

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public Stat setData(String path, byte data[], int version, long zxid, long time) throws KeeperException.NoNodeException {
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lasdtdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
......
// 指定节点数据更新后,调用WatchManager的triggerWatch来触发相关事件
dataWatches.triggerWatch(path, EventType.NodeDataChanged);
return s;
}

public Set<Watcher> triggerWatch(String path, EventType type) {
return triggerWatch(path, type, null);
}

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path);
......
// 如果不存在Watcher,直接返回
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e);
}
return watchers;
}

WatchManager的触发逻辑:

  1. 封装WatchedEvent:先将通知状态(KeeperState)、事件类型(EventType)和节点路径封装成一个WatchedEvent对象。

  2. 查询Watcher:根据节点路径从watchTable中取出对应的Watcher,若没找到则说明没有客户端在该节点注册过Watcher,直接退出;若找到了则将其提取出来,同时直接从watchTable和watch2Paths中将其删除(Watcher在服务端是一次性的,用完及删)。

  3. 调用process方法来触发Watcher:逐个调用步骤2找到的Watcher的process方法,即请求中的ServerCnxn对应方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    public class NIOServerCnxn extends ServerCnxn {
    ......
    synchronized public void process(WatchedEvent event) {
    // 请求头标记-1,标识当前是一个通知
    ReplyHeader h -= new ReplyHeader(-1, -1L, 0);
    ......
    // Convert WatchedEvent to a type that can be sent over the write
    // WatchedEvent包装为WatcherEvent方便网络传输序列化
    WatcherEvent e = event.getWrapper();
    // 向客户端发送通知
    sendResponse(h, e, "notification");
    }
    }

    ServerCnxn中的process不包含业务逻辑,真正的Watcher回调和业务逻辑都在客户端

(3)客户端回调Watcher
  • SendThread接收事件通知:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    // ZK客户端如何接收事件通知
    class SendThread extends Thread {
    ...
    // 服务端响应统一处理
    void readResponse(ByteBuffer incomingBuffer) throw IOException {
    ...
    if (replyHdr.getXid() == -1) {
    // -1 means notification 通知类型的响应
    ...
    // 1.将字节流转换为WatcherEvent对象
    WatcherEvent event = new WatcherEvent();
    event.deserialize(bbia, "response");
    // 2.处理chrootPath
    // convert from a server path to a client path
    if (chrootPath != null) {
    // 对服务端传过来的完整字节路径进行处理,生成客户端一个相对节点路径
    String serverPath = event.getPath();
    if (serverPath.compareTo(chrootPath) == 0)
    event.setPath("/");
    else if (serverPath.length() > chrootPath.length())
    event.setPath(serverPath.substring(chrootPath.length()));
    ...
    }
    // 3.还原WatchedEvent,适配process接口参数
    WatchedEvent we = new WatchedEvent(event);
    ...
    // 4.回调Watcher,将WatchedEvent交给EventThread线程,在下一个轮询周期进行Watcher回调
    eventThread.queueEvent(we);
    return;
    }
    ...
    }
    }
  • EventThread处理事件通知:

    SentThread通过 EventThread.queueEvent() 将事件传给EventThread线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    public void queueEvent(WatchedEvent event) {
    if (event.getType() == EventType.None && sessionState == event.getState()) {
    return;
    }
    sessionState = event.getState();
    // materialize the watchers based on the event
    WatcherSetEventPair pair = new WatcherSetEventPair();
    watcher.materialize(event.getState(), event.getType(), event.getPath(), event));
    // queue the pair (watch set & event) for later processing
    // 取出所有相关Watcher后,放入waitingEvents队列
    waitingEvents.add(pair);
    }

    // 根据通知事件,从ZKWatchManager中取出所有相关Watcher
    public Set<Watcher> materialize(Watcher.Event.KeeperState state, Watcher.Event.EventType type, String clientPath) {
    Set<Watcher> result = new HashSet<Watcher>();
    switch (type) {
    ...
    case NodeDataChanged:
    case NodeCreated:
    // 从相应的Watcher存储中去除对应的Watcher,客户端的Watcher也是一次性的
    synchronized (dataWatches) {
    addTo(dataWatches.remove(clientPath), result);
    }
    synchronized (existWatches) {
    addTo(existWatches.remove(clientPath), result);
    }
    break;
    ...
    }
    return result;
    }

    final private void addTo(Set<Watcher> from, Set<Watcher> to) {
    if (from != null) {
    to.addAll(from);
    }
    }

    EventThread的run方法不断对waitingEvents队列进行处理:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    public void run() {
    try {
    isRunning = true;
    while(true) {
    Object event = waitingEvents.take();
    if (event == eventOfDeath) {
    wasKilled = true;
    } else {
    processEvent(event);
    }
    ...
    }
    }
    }

    private void processEvent(Object event) {
    try {
    if (event instanceof WatcherSetEventPair) {
    // each watcher will process the event
    WatcherSetEventPair pair = (WatcherSetEventPair) event;
    // 每次从waitingEvents中取出一个Watcher并进行串行同步处理
    for (Watcher watcher : pair.watchers) {
    try{
    // 调用process完成回调
    watcher.process(pair.event);
    } catch (Throwable t) {
    ...

1.4.4 Watcher特性

  • 一次性:无论是服务端还是客户端,一旦Watcher被触发,就会从ZK存储中删除,所以使用时要反复注册,这样能够减轻服务器的压力。
  • 客户端串行执行:客户端Watcher回调时一个串行同步过程,保证了执行顺序。
  • 轻量:WatchedEvent是Watcher通知机制的最小通知单元,只包含三部分内容(通知状态、事件类型和节点路径),客户端需要主动去获取更新数据。客户端向服务端注册Watcher时,没有把真实Watcher对象传给服务端,只是在请求中使用Boolean类型标记,服务端仅仅保存了当前连接的ServerCnxn对象。

1.5 ACL—保障数据安全

1.5.1 权限模式-Scheme

权限模式用来确定权限验证过程中使用的检验策略,常用的四种权限模式:

  • IP:通过IP地址粒度进行权限控制,也支持按网段的方式进行配置,如 ip:192.168.0.1/24 表示针对 192.168.0.* IP段进行权限控制。

  • Digest:

    • 最常用的模式,类似于 username:password 形式的权限表示进行权限控制来区分不同应用。

    • ZK先后会进行两次编码处理,SHA-1算法加密BASE64编码,由 DigestAuthenticationProvider.generateDigest(String idPassword) 函数进行封装。

      1
      2
      3
      4
      5
      6
      7
      8
      public class DigestAuthenticationProviderUsage {

      public static void main(String[] args) throws NoSuchAlgorithmException {
      System.out.println(DigestAuthenticationProvider.generateDigest("foo:zk-book"));
      // 输出:foo:kWN6aNS.... 混淆无序的字符串
      }

      }
  • World:最开放的模式,数据节点访问对所有用户开放,只有一个权限标识 world:anyone

  • Super:超级用户模式,可以对任意ZK节点进行任意操作。

1.5.2 授权对象-ID

授权对象指权限赋予的用户或一个指定实体,如IP地址或机器:

1.5.3 权限-Permission

权限指通过权限检查后允许执行的操作,分为5类:

  • CREATE:数据节点的创建权限,允许授权对象在该数据节点下创建子节点。
  • DELETE:子节点的删除权限,允许授权对象删除该数据节点的子节点。
  • READ:数据节点的读取权限,允许授权对象访问该数据节点并读取其数据内容或子节点列表等。
  • WRITE:数据节点的更新权限,允许授权对象对该数据节点进行更新操作。
  • ADMIN:数据节点的管理权限,允许授权对象对该数据节点进行ACL相关的设置操作。

1.5.4 自定义扩展

ZK允许开发人员用过指定方式对ZK进行扩展,即 Pluggable ZooKeeper Authentication 机制,实现接口AuthenticationProvider:

1
2
3
4
5
6
7
8
// ZK默认的几种权限模式对应DigestAuthenticationProvider和IPAuthenticationProvider
public interface AuthenticationProvider {
String getScheme();
KeeperException.Code handleAuthentication(ServerCnxn cnxn, byte authData[]);
boolean matches(String id, String aclExpr);
boolean isAuthenticated();
boolean isValid(String id);
}

实现自定义权限控制器后,需要将其注册到ZK中,有两种方式:

  • 系统属性:启动参数添加配置信息

    1
    -Dzookeeper.authProvider.1=com.zkbook.CustomAuthenticationProvider
  • 配置文件:zoo.cfg 中添加配置信息

    1
    authProvider.1=com.zkbook.CustomAuthenticationProvider

权限控制器注册采用了延迟加载的策略,第一次处理包含权限控制请求时才会进行权限控制器的初始化,同时ZK将所有控制器注册到ProviderRegistry中,顺序为:DigestAuthenticationProvider和IPAuthenticationProvider先初始化,然后扫描 zookeeper.authProvider 获取自定义控制器并初始化。

1.5.5 ACL管理

通过zkCli脚本登录ZK服务器后,通过两种方式设置ACL:

  • 数据节点创建的同时进行ACL权限的设置:

    1
    2
    3
    4
    5
    6
    7
    # 命令格式
    create [-s] [-e] path data acl
    # 示例
    $ create -e /zk-book init digest: foo:MiHs3Eiylp......
    Created /zk-book
    $ getAcl /zk-book
    'digest,'foo:MiHs3Eiylp......
  • 使用 setAcl 命令单独对已经存在的数据节点进行ACL设置:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    setAcl path acl
    # 示例
    $ create -e /zk-book init
    Created /zk-book
    $ setAcl /zk-book digest:foo:MiHs3Eiylp......
    cZxid = 0x400000042
    ctime = ......
    mZxid = 0x400000042
    mtime = ......
    pZxid = 0x400000042
    cversion = 0
    dataVersion = 0
    aclVersion = 1
    ephemeralOwner = 0x1472ff49b020003
    dataLength = 4
    numChildren = 0
    $ getAcl /zk-book
    'digest,'foo:MiHs3Eiylp......

一旦对一个数据节点设置了ACL权限控制,没有授权的ZK客户端就无法访问该数据节点,但如果一个持久数据节点包含了ACL权限控制,其创建客户端已不再使用,这些数据节点如何清理?Super模式下使用超级管理员权限。

ZK服务器启动时添加如下系统属性:

1
2
# foo表示管理员名,后续字符串由管理员自主配置
-Dzookeeper.DigestAuthenticationProvider.superDigest=foo:MiHs3Eiylp......

应用中使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class AuthSampleSuper {
final static String PATH = "/zk-book";
public static void main(String[] args) throws Exception {
ZooKeeper zk1 = new ZooKeeper("domain1.book.zookeeper:2181", 5000, null);
zk1.addAuthInfo("digest", "foo:true".getBytes());
zk1.create(PATH, "init".getBytes(), Ids.CREATOR_ALL_ACL, CreateMode.EPHEMERAL);
ZooKeeper zk2 = new ZooKeeper("domain1.book.zookeeper:2181", 50000, null);
zk2.addAuthInfo("digest", "foo:zk-book".getBytes());
System.out.println(zk2.getData(PATH, false, null));
ZooKeeper zk3 = new ZooKeeper("domain1.book.zookeeper:2181", 50000, null);
zk3.addAuthInfo("digest", "foo:false".getBytes());
System.out.println(zk3.getData(PATH, false, null));
}
}
// 输出 org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /zk-book foo:false无法通过权限校验,foo:zk-book超级管理员可以随意进行操作

二. 序列化与协议

网络通信的首要问题是解决数据的序列号和反序列化操作。

2.1 Jute简介

Jute即ZK中的序列号组件,前身是Hadoop Record IO中的序列化组件,后来由于Apache Avro序列化框架有出色的跨语言特性、丰富的数据结构和对MapReduce的天然支持,且非常方便用于RPC调用,所以在0.21.0版本代替了Record IO。

ZooKeeper也曾尝试使用Apache Avro、Thrift或Google的protobuf来替换Jute,但考虑新老版本的兼容而未能推进,并且Jute的序列化能力一直都不是ZK的性能瓶颈。

2.2 Jute使用

使用Jute来对Java对象进行序列化和反序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class MockReqHeader implements Record {
private long sessionId;
private String type;

......

public void serialize(OutputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(this, tag);
a_.writeLong(sessionId, "sessionId");
a_.writeLong(type, "type");
a_.endRecord(this, tag);
}

public void deserialize(InputArchive a_, String tag) throws java.io.IOException {
a_.startRecord(tag);
sessionId = a_.readLong("sessionId");
type = a_.readLong("type");
a_.endRecord(tag);
}
}

调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
// 开始序列化
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
new MockReqHeader(0x34221eccb92a34el, "ping").serialize(boa, "header");
// 此处通常是TCP网络传输对象
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
// 开始反序列化
ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
BinaryInputArchive bbia = BinaryOutputArchive.getArchive(bbis);
new MockReqHeader();
header2.deserialize(bbia, "header");
bbis.close();
baos.close();

流程:

  1. 实体类实现Record接口的serialize和deserialize方法。
  2. 构建一个序列化器BinaryOutputArchive。
  3. 序列化,调用实体类的serialize方法,将对象序列化到指定tag。
  4. 反序列化,调用实体类的deserialize方法,从指定tag中反序列化出数据内容。

2.3 Jute深入

(1)Record接口

暂无。

(2)OutputArchive和InputArchive

暂无。

2.4 通信协议

ZK基于TCP/IP协议实现通信协议,请求和响应结构如下:

(1)请求和响应结构

  • 请求头(RequestHeader):

    • xid:记录客户端请求发起的先后序号,确保单个客户端请求的响应顺序。
    • type:请求的操作类型,定义在 OpCode 中
      • OpCode.create:创建节点
      • OpCode.delete:删除节点
      • OpCode.getData:获取节点数据
      • ……
  • 请求体(Request):请求的主体,包含所有操作内容。不同请求类型的请求体结构是不同的。

    • 会话创建(ConnectRequest):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      module org.apache.zookeeper.proto {
      class ConnectRequest {
      // 协议版本号
      int protocolVersion;
      // 最近一次收到的服务器ZXID
      long lastZxidSeen;
      // 会话超时时间
      int timeOut;
      // 会话标识
      long sessionId;
      // 会话密码
      buffer passwd;
      }
      ......
    • 获取节点数据(GetDataRequest):

      1
      2
      3
      4
      5
      6
      7
      8
      module org.apache.zookeeper.proto {
      class GetDataRequest {
      // 数据节点的节点路径
      ustring path;
      // 是否注册Watcher
      boolean watch;
      }
      ......
    • 更新节点数据(SetDataRequest):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      module org.apache.zookeeper.proto {
      class SetDataRequest {
      // 数据节点的节点路径
      ustring path;
      // 数据内容
      buffer data;
      // 节点数据的期望版本号
      int version;
      }
      ......
  • 响应头(ReplyHeader):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    module org.apache.zookeeper.proto {
    class ReplyHeader {
    // 对应请求发起的先后序号,原值返回
    int xid;
    // ZK服务器上的最新事务ID
    long zxid;
    // 错误码,出现异常时标识出来
    int err;
    }
    ......
  • 响应体(Response):

    • 会话创建(ConnectResponse):

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      module org.apache.zookeeper.proto {
      class ConnectResponse {
      // 协议版本号
      int protocolVersion;
      // 会话超时时间
      int timeOut;
      // 会话标识
      long sessionId;
      // 会话密码
      buffer passwd;
      }
      ......
    • 获取节点数据(GetDataResponse):

      1
      2
      3
      4
      5
      6
      7
      8
      module org.apache.zookeeper.proto {
      class GetDataResponse {
      // 数据内容
      buffer data;
      // 节点状态
      org.apache.zookeeper.data.Stat stat;
      }
      ......
    • 更新节点数据(SetDataResponse):

      1
      2
      3
      4
      5
      6
      module org.apache.zookeeper.proto {
      class SetDataResponse {
      // 节点状态
      org.apache.zookeeper.data.Stat stat;
      }
      ......

(2)实例

请求:

1
2
3
4
5
6
7
8
9
public class ASimpleGetDataRequest implements Watcher {
public static void main(String[] args) throws Exception {
ZooKeeper zk = new ZooKeeper("domain1.book.zookeeper", 5000, new ASimpleGetDataRequest());
// 获取节点数据
zk.getData("/$7_2_4/get_data", true, null);
}

public void process(WatchedEvent event) {}
}

使用Wireshark获取到其发送的网络TCP包。

十六进制表示含义如下:

响应:

十六进制表示含义如下:


参考:

🔗 《从Paxos到Zookeeper-分布式一致性原理与实践》